-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat!: use gossipsub for consensus broadcasts #1156
Conversation
Test Results (CI)570 tests ±0 569 ✅ ±0 3h 14m 37s ⏱️ - 21m 42s For more details on these failures, see this check. Results for commit ba31310. ± Comparison against base commit 3ecc671. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM will test when we are out of draft.
For performance and potential issues with incorrect decoding I think we need to use the topic to determine which message type we have
networking/core/src/worker.rs
Outdated
.gossip_message_codec | ||
// the incoming gossip message is a transaction | ||
if let Ok((length, msg)) = self | ||
.transaction_gossip_message_codec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should inspect the topic prefix rather than trying to decode and failing - as this might be slow or even potentially erroneously succeed for the wrong message type (because protobuf works on optional fields and ignores unknown fields).
We could "register" channels for each topic or topic prefix.
e.g.
networking.gossipsub_subscribe("consensus-0-31", codec).await?;
or maybe register a channel for sending/receiving messages on the topic
let (channel1, channel2) = message_channel::channel::<HotstuffMessage>::(100);
networking.gossipsub_subscribe("consensus-0-31", channel1).await?;
Channel could be something like this
pub fn channel<T>(capacity: usize) -> (MessageChannel<T>, MessageChannel<T>) {
let (ltr_tx, ltr_rx) = mpsc::channel(capacity);
let (rtl_tx, rtl_rx) = mpsc::channel(capacity);
(
MessageChannel {
sender: ltr_tx,
receiver: rtl_rx,
},
MessageChannel {
sender: rtl_tx,
receiver: ltr_rx,
},
)
}
#[derive(Debug)]
pub struct MessageChannel<T> {
sender: mpsc::Sender<T>,
receiver: mpsc::Receiver<T>,
}
impl<T> MessageChannel<T> {
pub async fn recv(&mut self) -> Option<T> {
self.receiver.recv().await
}
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.receiver.try_recv()
}
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
self.sender.send(value).await
}
pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
self.sender.try_send(value)
}
pub fn split(self) -> (mpsc::Receiver<T>, mpsc::Sender<T>) {
(self.receiver, self.sender)
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What might make it easier is if you do the en/decoding in the subscription handle and have the networking simply send the gossipsub::Message to the handle. This will likely remove the MessageSpec trait for gossip.
// Client code
// Set some options for the subscription
let opts = GossipSubcriptionOpts { max_message_length: 1024*1024 };
// Subscribe - that sets up a channel for sending/receiving
let (sender, receiver) = networking.gossipsub_subscribe("consensus-0-31", opts).await?;
// Receiver goes to inbound messaging
let msg = receiver.next_message().await?;
// Sender goes to outbound messaging
sender.send(msg).await?;
// In networking
pub async fn gossipsub_subscribe<T: prost::Message>(&self, topic: Topic, opts: GossipSubscriptionOpts) -> Result<...> {
let (reply, reply_rx) = oneshot::channel();
// Unbounded because of possible deadlock (consensus is waiting to send to networking, networking is waiting for consensus to read off the channel)
let (sender, receiver) = mpsc::unbounded_channel();
self.request_tx.send(NetworkRequest::GossipsubSubscribe {
topic,
receiver,
opts,
reply, // Reply<mpsc::Receiver<gossipsub::Message>>
}).await?;
let receiver = reply_rx.await?;
Ok((
GossipSender::<T>::new(sender),
GossipReceiver::<T>::new(receiver),
))
}
// The subscription types
#[derive(Debug, Clone)]
struct GossipSender<T> {
sender: mpsc::UnboundedSender<Vec<u8>>,
codec: ProstCodec<T>,
}
impl<T: prost::Message> GossipSender<T> {
pub async fn send(&self, msg: T) -> Result<(), MessageError> {
let len = msg.encoded_len();
let mut buf = Vec::with_capacity(len);
self.codec
.encode_to(&mut buf, message)
.await?;
self.sender.send(buf)?;
Ok(())
}
}
#[derive(Debug)]
struct GossipReceiver<T> {
receiver: mpsc::Receiver<(PeerId, gossipsub::Message)>,
codec: ProstCodec<T>,
}
impl<T: prost::Message> GossipReceiver<T> {
pub async fn next_message(&self) -> Result<Option<(PeerId, usize, T)>, MessageError> {
let Some((source, raw_msg)) = self.receiver.recv().await? else {
return Ok(None);
};
let (_len, msg) = self.codec.decode_from(&mut raw_msg.data.as_slice()).await?;
Ok((source, msg))
}
}
The message validation (report_message_validation_result) in networking will accept the message as long as the message length in bytes is not exceeded.
applications/tari_validator_node/src/p2p/services/messaging/outbound.rs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - tested and works
552e23f
* development: feat: consistent transaction abort reason (tari-project#1172) feat(swarm): auto mining (tari-project#1174) fix(wallet_web_ui): invalid resource error on transfers (tari-project#1175) chore(deps): bump rustsec/audit-check from 1.4.1 to 2.0.0 (tari-project#1164) feat!: use gossipsub for consensus broadcasts (tari-project#1156) fix: syncing from epoch 0 before consensus has started (tari-project#1167) chore: moved fee exhaust constant to consensus constants (tari-project#1166) chore(deps): bump rollup from 3.29.4 to 3.29.5 in /applications/tari_indexer_web_ui (tari-project#1159) chore(deps-dev): bump vite from 4.5.3 to 4.5.5 in /applications/tari_validator_node_web_ui (tari-project#1163) fix: fixing integration tests (tari-project#1165)
Description
tx_gossip_messages_by_topic
) that maps topic prefixes to a channel. On startup we register a channel for mempool (topic prefixtransactions
) and another for consensus (topic prefixconsensus
). Then, on gossip message received, the networking layer simply relays it to the appropriate channel based on the topic prefix. This design allows us to easily use gossip for other purposes in the future.ConsensusGossipService
in the Validator node, that listens for epoch events and subscribes to the appropriate gossip topics. It also does message encoding/decoding.MempoolGossip
module to adapt it to the new gossip design, by implementing message encoding/decoding and to receive the messages from networking.ConsensusInboundMessaging
now also listens to consensus messages coming from the newConsensusGossipService
.ConsensusOutboundMessaging
uses the newConsensusGossipService
for broadcasting.OutboundMessaging
trait for themulticast
function now expects aShardGroup
instead of a committee.Motivation and Context
Hotstuff and cerberus are message based protocols. Currently we implement a message protocol that requires nodes to connect to every other node in the local shard. For cross shard messaging, we implement a strategy that limits the number of messages sent but relies on multiple connections per peer across shards.
We want to leverage libp2p's gossipsub for all consensus broadcasts to local/foreign shards.
consensus-{start}-{end}
(start
andend
are the start/end shards in theShardGroup
type, similar to the mempool service)How Has This Been Tested?
Manually by starting a local network using
tari_spawn
, performing transactions and inspecting the logs.What process can a PR reviewer use to test or verify this change?
See previous section
Breaking Changes